use crate::source::{Source, SourceResult, SourceFormat};
use crate::conf_init::{MapResult, ResultErr};
use crate::data_frame::{DataInstance, Data};
use std::future::Future;
use tokio::fs::File;
use tokio::io::{BufReader, AsyncBufReadExt, Lines};
use crate::data_frame::json::JsonValue;
use async_trait::async_trait;
use std::sync::Arc;

pub struct SourceFile{
    file_name: String,
    format: SourceFormat,
    lines: Option<Lines<BufReader<File>>>,
}
#[derive(Deserialize)]
pub struct SourceFileConf{
    file_name: String,
}
impl SourceFile{
    pub fn new(format: &SourceFormat, cnf: &SourceFileConf)->MapResult<Box<dyn Source>>{
        let source_file = SourceFile{
            file_name: cnf.file_name.clone(),
            format: format.clone(),
            lines: None,
        };

        return MapResult::Ok(Box::new(source_file));
    }
}

#[async_trait]
impl Source for SourceFile{
    async fn recv(&mut self)->SourceResult{
       if let Some(ref mut lines) = self.lines{
            let one_line = lines.next_line().await;
           match one_line{
               Ok(t) => {
                   if let Some(json) = t{
                       let json = JsonValue::new(&json)?;
                        return SourceResult::Ok(DataInstance::new(Data::DataFrame(json)));
                   }else{
                        return SourceResult::Err(
                            ResultErr::new(format!("SourceFile next_line None"))
                        );
                   }
               },
               Err(e) => {
                   return SourceResult::Err(
                       ResultErr::new(format!("SourceFile next_line err {:?}", e))
                   );
               },
           }
       }else{
            let fs = File::open(&self.file_name).await;
            match fs{
                Ok(t) => {
                    let bf = BufReader::new(t);
                    let lines = bf.lines();
                    self.lines = Some(lines);
                    return self.recv().await;
                },
                Err(e) => {
                    return SourceResult::Err(ResultErr::new(format!("File::open err {:?}", e)));
                },
            }
       }
    }
}
